package com.itcast.hadoop.areapartition;/**
 * Created by Administrator on 2019/4/4 0004.
 */

import com.itcast.hadoop.flowsum.FlowBean;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author ydf
 * @com kt
 * @create 2019-04-04 下午 3:12
 * 对流量原始日志进行流量统计，将不同省份的用户统计结果输出到不同文件
 * 需要自定义改造两个机制：
 * 1.改造分区逻辑，自定义一个partitiner
 * 2.自定义reduer task的并发任务数
 **/
public class FlowSumArea {

    /**
     * 1363157983019   13719199419     68-A1-B7-03-07-B1:CMCC-EASY     120.196.100.82                  4       0       240     0       200
     * 1363157984041   13660577991     5C-0E-8B-92-5C-20:CMCC-EASY     120.197.40.4    s19.cnzz.com    站点统计        24      9       6960    690     200
     */
    public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = StringUtils.split(line, "\t");
            String phoneNB = fields[1];
            long up_flow = Long.parseLong(fields[7]);
            long d_flow = Long.parseLong(fields[8]);
            //封装数据为kv并输出
            context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow, d_flow));
        }
    }

    public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            long up_flow_counter = 0;
            long d_flow_counter = 0;
            for (FlowBean bean : values) {
                up_flow_counter += bean.getUp_flow();
                d_flow_counter += bean.getD_flow();
            }
            context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf=new Configuration();
        Job job= Job.getInstance(conf);

        job.setJarByClass(FlowSumArea.class);

        job.setMapperClass(FlowSumAreaMapper.class);
        job.setReducerClass(FlowSumAreaReducer.class);
        //设置我们自定义的分组逻辑
        job.setPartitionerClass(AreaPartitioner.class);
        //设置reduer task的并发任务数
        job.setNumReduceTasks(6);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        System.exit(job.waitForCompletion(true)?0:1);
    }

}
